/**
 * Copyright [2020] [LiBo/Alex of copyright liboware@gmail.com ]
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.hyts.stream.engine.stream.complete;

import com.hyts.stream.engine.model.TimeScaleMetrics;
import com.hyts.stream.engine.model.WordEvent;
import com.hyts.stream.engine.source.CustomSource;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.optimizer.operators.MapDescriptor;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @project-name:lhy-stream
 * @package-name:com.hyts.stream.engine.stream.complete
 * @author:LiBo/Alex
 * @create-date:2022-05-17 21:42
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
public class MulltiKeyByStreamExecutor {


    static String[] strings = new String[]{"yyyyMM","yyyyMMdd","yyyyMMddHH"};


    public static void main(String[] args) {

        final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        for(int i = 0 ; i< 3 ; i++) {

            int finalI = i;
            WindowedStream<Object, String, TimeWindow> windowedStream =
                    streamExecutionEnvironment.addSource(new CustomSource()).
                            assignTimestampsAndWatermarks(WatermarkStrategy.
                            forMonotonousTimestamps()).
                            map(new RichMapFunction<WordEvent, Object>() {
                            @Override
                            public Object map(WordEvent value) throws Exception {
                                value.setFormat(strings[finalI]);
                                return value;
                                }
                            }).
                            keyBy(param -> ((WordEvent)param).getWord()+strings[finalI]).
                            window(TumblingProcessingTimeWindows.of(Time.seconds(3))).
                            allowedLateness(Time.minutes(30));

            windowedStream.aggregate(new AggregateFunction<Object, Object, Object>() {


                @Override
                public Object createAccumulator() {
                    return new TimeScaleMetrics();
                }

                @Override
                public Object add(Object value, Object accumulator) {
                    WordEvent wordEvent = (WordEvent)value;
                    TimeScaleMetrics accumulatorObject = (TimeScaleMetrics)accumulator;
                    accumulatorObject.setCount1(wordEvent.getCount()+
                            ((TimeScaleMetrics) accumulator).getCount1());
                    accumulatorObject.setTimeType(strings[finalI]);
                    accumulatorObject.setKey(wordEvent.getWord());
                    return accumulatorObject;
                }

                @Override
                public Object getResult(Object accumulator) {
                    return accumulator;
                }

                @Override
                public Object merge(Object a, Object b) {
                    return null;
                }

            }).addSink(new SinkFunction<Object>() {

                @Override
                public void invoke(Object value, Context context) throws Exception {
                    System.out.println("--resukt:      -"+value+"-----");
                }

                @Override
                public void finish() throws Exception {
                    SinkFunction.super.finish();
                }
            });
        }
        try {
            streamExecutionEnvironment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
